package cn.webank.framework.message.integration.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Repository;

import cn.webank.framework.dto.AsyncRMBMessage;
import cn.webank.framework.dto.SyncRMBMessage;
import cn.webank.framework.exception.SysException;
import cn.webank.framework.mapper.JsonMapper;
import cn.webank.framework.message.integration.RMBSao;
import cn.webank.rmb.api.RMB;
import cn.webank.rmb.api.RMBInternalErrorCodes;
import cn.webank.rmb.api.Request;
import cn.webank.rmb.api.Response;
import cn.webank.rmb.api.Util;
import cn.webank.rmb.core.intf.ISubscriber;
import cn.webank.rmb.exception.RMBIllegalAccessException;
import cn.webank.rmb.exception.RMBValidationException;
import cn.webank.rmb.message.AppHeader;
import cn.webank.rmb.message.Message;
import cn.webank.rmb.message.Ret;

@Repository("cn.webank.framework.message.integration.RMBSao")
public class RMBPojoSao implements RMBSao, InitializingBean {
	private final static Logger LOG = LoggerFactory.getLogger(RMBPojoSao.class);
	private final static String RMB_TIMEOUT_CODE = "00009001";
	private final static String RMB_TIMEOUT_MSG = "rmb timeout";

	private static JsonMapper jsonMapper = JsonMapper.nonDefaultMapper();

	@Override
	public <E, T> SyncRMBMessage<E, T> send(SyncRMBMessage<E, T> message,
			long timeout, TimeUnit unit) {

		Request simpleRequest = Util.createRequest(message.getRmbMessage(),
				unit.toMillis(timeout));

		Response response = null;
		try {
			response = RMB.sendRequest(simpleRequest);
		} catch (RMBValidationException | RMBIllegalAccessException e) {
			throw new SysException("rmb sendRequest error", e);
		}

		String errCode = response.getErrCode();
		if (RMBInternalErrorCodes.RQUEST_TIMEOUT.equals(errCode)) {
			message.setResponseObject(null);
			List<Ret> rets = new ArrayList<Ret>();
			Ret ret = new Ret();
			message.getSysHeader().getConsumerId();
			ret.setCode(RMB_TIMEOUT_CODE);// rmb没有系统id，默认0000
			ret.setMsg(RMB_TIMEOUT_MSG);
			rets.add(ret);
			message.setRetList(rets);
			message.setRetStatus(AppHeader.RET_STATUS_FAILURE);
		} else {
			Message retMessage = response.getMessage();
			LOG.debug("response message:" + jsonMapper.toJson(retMessage));
			message.setRmbMessage(retMessage);
		}

		return message;

	}

	@Override
	public <E> void publish(AsyncRMBMessage<E> message) {
		try {
			RMB.publish(message.getRmbMessage());
		} catch (RMBValidationException | RMBIllegalAccessException e) {
			throw new SysException("rmb publish error", e);
		}

	}

	@Override
	public void deregisterSubscriber(ISubscriber subscriber) {
		try {
			RMB.deregisterSubscriber(subscriber);
		} catch (RMBValidationException | RMBIllegalAccessException e) {
			throw new SysException("rmb deregisterSubscriber error", e);
		}
	}

	@Override
	public void registerSubscriber(ISubscriber subscriber) {
		try {
			RMB.registerSubscriber(subscriber);
		} catch (RMBValidationException | RMBIllegalAccessException e) {
			throw new SysException("rmb registerSubscriber error", e);
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
	 */
	@Override
	public void afterPropertiesSet() throws Exception {
		RMB.init();
	}

	@Override
	public void destroy() {
		try {
			RMB.destroy();
		} catch (Exception e) {

		}
	}
}
